import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
import sys

reload(sys)
import xlrd
import os


def extrat(city,dir):
    aaa=sqlContext.sql("""
        SELECT
            DISTINCT  t.msisdn
        FROM (
            select
                msisdn
            FROM
                beauty.yichang
            WHERE unix_timestamp(insert_time) >=1525104000
        ) t
         LEFT OUTER JOIN
         history.beauty t2
         ON t.msisdn = t2.msisdn
                    WHERE t2.msisdn is null
         limit 120000
    """)
    writer = pd.ExcelWriter("""/home/extract/beauty/%(city)s/%(dir)s.xlsx""" %{"city":city,"dir":dir},engine='xlsxwriter')
    aaa.toPandas().to_excel(writer, sheet_name='Sheet1')
    writer.save()

def eachFile(city,city_mean,dir,pt):
    read_file_df = pd.read_excel("""/home/extract/beauty/%(city)s/%(dir)s.xlsx""" %{"city":city,"dir":dir})
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(read_file_df)
    spark_df.registerTempTable("tmp_table" )
    sql="""select
                cast(msisdn as string) msisdn,
                cast('%(city_mean)s' as string) city
            from
                tmp_table"""%{"city_mean":city_mean}
    print sql
    df = sqlContext.sql(sql)
    df.repartition(10).write.mode("append").orc("/user/hive/warehouse/history.db/beauty/pt=%s" % pt)
    sqlContext.sql("alter table history.beauty drop if exists partition (pt=%s)"% pt)
    sqlContext.sql("alter table history.beauty add if not exists partition (pt=%s)"% pt)


extrat('yichang','20181207_12w')
eachFile('yichang','宜昌市','20181207_12w','20181207')







